package com.tomato.study.kafka.provider.provider;

import com.alibaba.fastjson.JSON;
import com.tomato.study.kafka.core.constant.KafkaConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import javax.annotation.Resource;

/**
 * 生产者
 *
 * @author lizhifu
 * @date 2022/3/22
 */
@Component
@Slf4j
public class KafkaProvider {
    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    /**
     * 发后即忘(fire-and-forget)
     * 只管发送消息，并不需要关心消息是否发送成功。
     * 其本质上也是一种异步发送的方式，消息先存储在缓冲区中，达到设定条件后再批量进行发送。
     * 这是 kafka 吞吐量最高的方式，但同时也是消息最不可靠的方式，因为对于发送失败的消息并没有做任何处理，某些异常情况下会导致消息丢失。
     * @param obj
     */
    public void sendFireAndForget(Object obj){
        String obj2String = JSON.toJSONString(obj);
        log.info("sendFireAndForget 准备发送消息为：{}", obj2String);
        kafkaTemplate.send(KafkaConstant.TOPIC_TEST, obj);
    }
    /**
     * 发送消息
     * @param obj
     */
    public void send(Object obj) {
        String obj2String = JSON.toJSONString(obj);
        log.info("send 准备发送消息为：{}", obj2String);
        // 发送消息
        // Kafka根据topic对消息进行归类，发布到Kafka集群的每条消息都需要指定一个topic
        // topic默认只有一个分区，分区也没有副本
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(KafkaConstant.TOPIC_TEST, obj);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                //发送失败的处理
                log.info(KafkaConstant.TOPIC_TEST + " - 生产者 发送消息失败：" + throwable.getMessage());
            }
            @Override
            public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                //成功的处理
                log.info(KafkaConstant.TOPIC_TEST + " - 生产者 发送消息成功：" + stringObjectSendResult.toString());
            }
        });
    }

}
